package com.imyuanma.qingyun.common.client.monitor.trace;

import com.imyuanma.qingyun.common.core.concurrent.QingYunThreadPoolExecutor;
import com.imyuanma.qingyun.common.util.CollectionUtil;
import com.imyuanma.qingyun.interfaces.monitor.service.IMonitorTraceOutService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.stream.Collectors;

/**
 * 链路数据资源库
 *
 * @author wangjy
 * @date 2022/09/19 23:33:43
 */
@Component
public class TraceDataRepository {
    private static final Logger logger = LoggerFactory.getLogger(TraceDataRepository.class);
    /**
     * 日志队列,用于存放日志信息
     * 基于链表的阻塞队列,使用2把锁来分别管理读和写,并发性能更好.
     */
    private static final LinkedBlockingQueue<TraceMeta> TRACE_META_QUEUE = new LinkedBlockingQueue<>(5000);
    /**
     * 链路数据消费线程池
     */
    private static final QingYunThreadPoolExecutor CONSUME_EXECUTOR = QingYunThreadPoolExecutor.build("traceDataConsumePool");
    /**
     * 链路外兑服务
     */
    @Autowired
    private IMonitorTraceOutService monitorTraceOutService;


    /**
     * 存储链路数据
     *
     * @param traceMeta 链路数据
     * @return 有空余位置则插入并返回true, 否则直接返回false
     */
    public static boolean offer(TraceMeta traceMeta) {
        boolean success = TRACE_META_QUEUE.offer(traceMeta);
        if (!success) {
            logger.info("[存储链路数据] 链路数据存储队列已满,添加数据失败");
        }
        return success;
    }

    @PostConstruct
    private void init() {
        logger.info("[消费链路数据任务] 任务启动");
        new Thread(() -> {
            while (!Thread.interrupted()) {
                try {
                    int cap = 10;
                    long start = System.currentTimeMillis();
                    List<TraceMeta> list = new ArrayList<>(cap);
                    // 批量抓取
                    TRACE_META_QUEUE.drainTo(list, cap);
                    // 有数据, 则保存
                    if (CollectionUtil.isNotEmpty(list)) {
                        CONSUME_EXECUTOR.execute(() -> this.save(list));
                    }
                    // 若满载, 则继续下一轮, 否则, 等待1s继续下一轮
                    if (list.size() < cap) {
                        Thread.sleep(5000L);
                    }
                } catch (Throwable e) {
                    logger.error("[消费链路数据任务] 链路数据消费出现异常:", e);
                    try {
                        Thread.sleep(1000L);
                    } catch (InterruptedException ex) {
                        logger.error("[消费链路数据任务] 数据消费异常后执行睡眠操作发生异常:", ex);
                    }
                }
            }
            logger.error("[消费链路数据任务] 消费中断!");
        }).start();
    }

    /**
     * 保存链路数据
     *
     * @param list
     */
    private void save(List<TraceMeta> list) {
        if (CollectionUtil.isEmpty(list)) {
            return;
        }
        monitorTraceOutService.batchSaveTraceLog(list.stream().map(TraceMeta::getTraceDTO).collect(Collectors.toList()));
    }
}
